package com.hncc.es.util;

import com.ruoyi.common.constant.HttpStatus;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.common.core.page.TableSupport;
import com.ruoyi.common.utils.uuid.UUID;
import com.hncc.common.exception.ApiException;
import com.hncc.es.domain.EsIdable;
import io.jsonwebtoken.lang.Assert;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.List;

/**
 * 根据若依分页查询的TableDataInfo结果，导入数据到es
 *
 * @author 天天向上 （john.yi@qq.com）
 * @date 2020/8/26
 */
@Slf4j
public class EsImportTemplate<T> {
    /**
     * 当PAGE_SIZE大于BULK_SIZE时将统一使用bulkIndex导入
     * 否则使用JPA默认的saveAll导入
     */
    public static final int PAGE_SIZE = 2000;
    public static final int BULK_SIZE = 500;
    private RestTemplate restTemplate;
    private HttpHeaders httpHeaders;

    public EsImportTemplate() {

    }

    public EsImportTemplate(RestTemplate restTemplate, HttpHeaders httpHeaders) {
        this.restTemplate = restTemplate;
        this.httpHeaders = httpHeaders;
    }

    /**
     * 模板方法，批量保存数据到es
     *
     * @param apiUrl 远程调用api的url
     * @param callback 回调接口
     */
    public void batchSave(String apiUrl, BatchSaveCallback callback) {
        Assert.notNull(restTemplate, "restTemplate不能为空");
        Assert.notNull(httpHeaders, "httpHeaders不能为空");
        int totalPages;
        long total;
        int pageNum = 1;
        String className = "";
        do {
            String url = new StringBuilder().append(apiUrl).append("?").append(TableSupport.PAGE_NUM).append("=").append(pageNum).append("&").append(TableSupport.PAGE_SIZE).append("=").append(PAGE_SIZE).toString();
            ResponseEntity<TableDataInfo> responseEntity =
                    restTemplate.exchange(url, HttpMethod.GET, new HttpEntity<String>(httpHeaders), TableDataInfo.class);
            TableDataInfo tableDataInfo = responseEntity.getBody();
            total = tableDataInfo.getTotal();
            totalPages = (int) (total % PAGE_SIZE == 0 ? total / PAGE_SIZE : total / PAGE_SIZE + 1);
            log.info("result code:{}, message:{}", tableDataInfo.getCode(), tableDataInfo.getMsg());
            if (tableDataInfo.getCode() == HttpStatus.SUCCESS && total > 0) {
                List<T> list = (List<T>) tableDataInfo.getRows();
                callback.doInService(list);
                if (log.isDebugEnabled()) {
                    className = list.get(0).getClass().getSimpleName();
                    log.debug("导入第{}页数据{}条", pageNum, list.size());
                }
            } else {
                throw new ApiException("获取数据失败，" + tableDataInfo.getMsg(), tableDataInfo.getCode());
            }
            pageNum++;
        } while (pageNum <= totalPages);
        if (log.isInfoEnabled()) {
            log.info("导入{}数据总记录数{}，总页数{}", className, total, totalPages);
        }
    }

    /**
     * 使用bulkIndex方式导入数据到es，
     *
     * @param elasticsearchOperations 模板对象
     * @param dataList              实现了EsIdable接口的实体类集合
     * @param uuid                  true使用uuid做主键，否则使用es对象的id做主键
     */
    public void bulkIndex(ElasticsearchOperations elasticsearchOperations, List<T> dataList, boolean uuid) {
        int counter = 0;
        List<IndexQuery> indexQueries = new ArrayList<>();
        for (T item : dataList) {
            IndexQuery indexQuery = new IndexQueryBuilder().withId(
                    uuid ? UUID.fastUUID().toString() : ((EsIdable) item).getEsId())
                    .withObject(item).build();
            indexQueries.add(indexQuery);
            if (counter % BULK_SIZE == 0) {
                elasticsearchOperations.bulkIndex(indexQueries);
                indexQueries.clear();
                log.info("bulkIndex counter:{}", counter);
            }
            counter++;
        }
        if (indexQueries.size() > 0) {
            elasticsearchOperations.bulkIndex(indexQueries);
        }
        log.info("bulkIndex completed.");
    }
}
